home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Aminet 1 (Walnut Creek)
/
Aminet - June 1993 [Walnut Creek].iso
/
aminet
/
comm
/
misc
/
xqsrc1_7.lzh
/
library
/
queue.c
< prev
next >
Wrap
C/C++ Source or Header
|
1993-03-12
|
22KB
|
1,044 lines
/*
* Name: queue.c
*
* Description: Queue management functions for xferq.library
*
* Copyright: 1992-1993 by David Jones.
*
* Distribution:
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to:
*
* The Free Software Foundation David Jones
* 675 Mass Ave 6730 Tooney Drive
* Cambridge, MA Orleans, Ontario
* 02139 K1C 6R4
* USA Canada
*
* Usenet: gnu@prep.ai.mit.edu aa457@freenet.carleton.ca
* Fidonet: 1:163/109.8
*
* $Log: $
*
*/
#include <exec/types.h>
#include <exec/lists.h>
#include <exec/nodes.h>
#include <exec/semaphores.h>
#include <utility/tagitem.h>
#include <proto/exec.h>
#include <proto/utility.h>
#include <proto/dos.h>
#include "xferq.h"
#include "xferqint.h"
#include "xferq_pragmas.h"
#include "execlists.h"
/*
* Variables
*/
struct SignalSemaphore QueueLock;
struct MinList QueueList;
BOOL QueueScanned;
extern struct Library *UtilityBase;
struct EligibleWorkData {
struct ExtSessWalk esw;
struct WorkNode *wn;
BOOL incLocked;
short minPri, maxPri;
};
struct PendingWorkData {
struct ExtSessWalk esw;
BOOL pending;
};
struct FindWorkData {
struct ExtSessWalk esw;
struct WorkNode *wn;
char *name;
};
void InitQueue(void)
/*
* Does: Initializes the queue manager data structures.
*/
{
InitSemaphore(&QueueLock);
NEWLIST(&QueueList);
#ifdef DEBUG
kprintf("QueueList = %08lx\n", &QueueList);
#endif
QueueScanned = FALSE;
}
struct SiteNode *NewSiteQueue(struct NetAddress *site)
/*
* In: site Site to create queue for
*
* Does: Allocates a new site queue and links it in to the global
* structure.
*/
{
struct SiteNode *sn;
sn = AllocObject(sizeof(struct SiteNode), XQO_SITENODE);
if (!sn) {
return NULL;
}
sn->site = XfqCopyObject(site);
NEWLIST(&sn->workList);
sn->flags = 0;
sn->numSessions = 0;
sn->numLocks = 0;
ADDHEAD(&QueueList, sn);
return sn;
}
void DropSiteNode(struct SiteNode *sn)
/*
* In: sn Pointer to site node to drop
*
* Does: Frees memory for all work nodes queued to site node.
*/
{
struct WorkNode *wn, *twn;
wn = FIRST(&sn->workList);
while (twn = NEXT(wn)) {
XfqDropObject(wn);
wn = twn;
}
}
void DropQueue(void)
/*
* Does: Frees memory for all queue nodes.
*/
{
struct SiteNode *sn, *tsn;
sn = FIRST(&QueueList);
while (tsn = NEXT(sn)) {
DropSiteNode(sn);
FreeObject(sn);
sn = tsn;
}
InitQueue();
}
struct SiteNode *FindSiteQueue(struct NetAddress *addr)
/*
* In: addr Address to find site queue for
*
* Does: Finds the site queue corresponding to the supplied address.
* If not found, this function tries to create
* a new site queue. The function returns a pointer to a site
* queue, or NULL if error.
*/
{
struct SiteNode *sn;
if (!QueueScanned) {
ScanQueue();
if (!QueueScanned) {
return NULL;
}
}
sn = FIRST(&QueueList);
while (NEXT(sn)) {
if (!XfqCmpAddressTags(addr, sn->site,
XQ_Mandatory, XQADDR_ALLBUTUSER,
TAG_DONE)) {
if (sn->flags & XQSITE_UNREAD) {
ReadQueue(sn);
if (sn->flags & XQSITE_UNREAD) {
return NULL;
}
}
return sn;
}
sn = NEXT(sn);
}
sn = NewSiteQueue(addr);
return sn;
}
void ExtWalkSession(void *object, struct ExtSessWalk *esw)
/*
* In: object Object to walk
* esw Pointer to control data
*
* Does: Performs actions similar to XfqWalkSessionCallBack(), but
* which are more useful to the queue functions: for each site
* node, data is filled in in the esw structure.
*/
{
struct SiteNode *sq;
struct SessNode *sn, *tsn;
if (!object) {
/* Walk whole system */
sq = FIRST(&QueueList);
while (NEXT(sq)) {
esw->sn = sq;
esw->na = sq->site;
if (!(*esw->func)(esw)) {
return;
}
sq = NEXT(sq);
}
}
else if (ObjectType(object) == XQO_ADDRESS) {
/* Access single address */
esw->na = object;
esw->sn = FindSiteQueue(object);
if (!esw->sn) {
return;
}
(*esw->func)(esw);
}
else if (ObjectType(object) == XQO_SESSION) {
/* Walk session */
sn = FIRST(&((struct Session *)object)->sessList);
while (tsn = NEXT(sn)) {
esw->na = sn->addr;
esw->sn = FindSiteQueue(sn->addr);
if (!esw->sn) {
return;
}
if (!(*esw->func)(esw)) {
return;
}
sn = tsn;
}
}
else {
Error(XQERROR_BADOBJECT);
}
}
BOOL EligibleWorkCB(struct EligibleWorkData *ewd)
/*
* In: ewd Pointer to working data
*
* Does: Searches for eligible work and stores a pointer to it in
* the hook data area.
*/
{
struct WorkNode *wn;
wn = FIRST(&ewd->esw.sn->workList);
#ifdef DEBUG
kprintf("EW sn=%08lx pl=%08lx ph=%08lx\n", ewd->esw.sn,
ewd->minPri, ewd->maxPri);
#endif
while (NEXT(wn)) {
#ifdef DEBUG
kprintf("EW node W=%08lx SF=%02lx UF=%02lx ST=%ld, P=%ld\n",
wn, wn->sysFlags, wn->userFlags, wn->status, wn->node.ln_Pri);
#endif
if (wn->status == XQ_UNSENT && !(wn->userFlags & XQ_SENDLATER) &&
(ewd->incLocked || !(wn->sysFlags & XQ_LOCKED)) &&
wn->node.ln_Pri >= ewd->minPri &&
wn->node.ln_Pri <= ewd->maxPri) {
ewd->wn = wn;
#ifdef DEBUG
kprintf("EW found %08lx.\n", wn);
#endif
return FALSE;
}
wn = NEXT(wn);
}
#ifdef DEBUG
kprintf("EW found nothing.\n");
#endif
return TRUE;
}
struct WorkNode *EligibleWork(void *object, struct TagItem *tags)
/*
* In: object Object to find eligible work for.
* tags Taglist controlling scope of search.
*
* Does: Finds work that can be sent. If incLocked is TRUE, then locked
* work is also considered, even though it cannot be sent.
*/
{
struct EligibleWorkData ewd;
ewd.incLocked = GetTagData(XQ_IncLocked, FALSE, tags);
ewd.minPri = GetTagData(XQ_MinPri, XQ_MINPRI, tags);
ewd.maxPri = GetTagData(XQ_MaxPri, XQ_MAXPRI, tags);
#ifdef DEBUG
kprintf("Entered EligibleWork(%08lx,%ld)\n", object, ewd.incLocked);
#endif
ewd.wn = NULL;
ewd.esw.func = EligibleWorkCB;
ExtWalkSession(object, &ewd);
#ifdef DEBUG
kprintf("Left EligibleWork\n");
#endif
return ewd.wn;
}
struct WorkNode *EligibleWorkTags(void *object, Tag tag, ...)
{
return EligibleWork(object, (struct TagItem *)&tag);
}
BOOL AnyDirty(void)
/*
* Does: Returns TRUE if any of the queues are dirty, FALSE otherwise.
*/
{
struct SiteNode *sn;
sn = FIRST(&QueueList);
while (NEXT(sn)) {
if (sn->flags & XQSITE_DIRTY) {
return TRUE;
}
sn = NEXT(sn);
}
return FALSE;
}
struct WorkNode *LockWork(struct WorkNode *wn)
/*
* In: wn Pointer to node to lock.
*
* Does: If node is locked, then set error code and return NULL.
* Otherwise assert lock bit and return pointer to node.
*/
{
if (wn->sysFlags & XQ_LOCKED) {
Error(XQERROR_LOCKED);
return NULL;
}
else {
wn->sysFlags |= XQ_LOCKED;
return wn;
}
}
void __saveds __asm LIBUnlockWork(register __a0 struct WorkNode *wn)
/*
* In: wn A0 Pointer to node to unlock
*
* Does: Clears the lock bit in the work node.
*/
{
ObtainSemaphore(&QueueLock);
wn->sysFlags &= ~XQ_LOCKED;
ReleaseSemaphore(&QueueLock);
}
BOOL PendingWorkCB(struct PendingWorkData *pwd)
/*
* In: pwd Pointer to working data
*
* Does: Searches for pending work and stores a pointer to it in
* the hook data area.
*/
{
if (pwd->esw.sn->numLocks) {
pwd->pending = TRUE;
return FALSE;
}
else {
return TRUE;
}
}
ULONG __saveds __asm LIBAnyWork(register __a0 void *object,
register __a1 struct TagItem *tags)
/*
* In: object A0 Pointer to object to check work for
* tags A1 Taglist giving priority window
*
* Does: Checks to see if any work is queued to the object in question.
* The function returns one of the following:
*
* NO_WORK No work is queued up
* LOCKED_WORK Work is queued, but other users have locked it
* UNLOCKED_WORK Work is queued and unlocked
*
* The object passed in may be a session or an address. If it's
* a session, the function checks every address in the session
* for work.
*/
{
ULONG result;
struct PendingWorkData pwd;
ObtainSemaphore(&QueueLock);
#ifdef DEBUG
kprintf("In AnyWork - ObjectType %ld\n", ObjectType(object));
#endif
if (EligibleWorkTags(object,
XQ_IncLocked, FALSE,
TAG_MORE, tags)) {
result = UNLOCKED_WORK;
}
else if (EligibleWorkTags(object,
XQ_IncLocked, TRUE,
TAG_MORE, tags)) {
result = LOCKED_WORK;
}
else {
pwd.pending = FALSE;
pwd.esw.func = PendingWorkCB;
ExtWalkSession(object, &pwd);
if (pwd.pending) {
result = PENDING_WORK;
}
else {
result = NO_WORK;
}
}
ReleaseSemaphore(&QueueLock);
#ifdef DEBUG
kprintf("Left AnyWork result %ld.\n", result);
#endif
SetErrorTags(tags);
return result;
}
BOOL FindWorkCB(struct FindWorkData *fwd)
/*
* In: fwd Pointer to state info
*
* Does: Searches for the name in the given queue and aborts the walk
* if found.
*/
{
fwd->wn = (struct WorkNode *)FindNameNoCase(&fwd->esw.sn->workList,
fwd->name);
/*
* Abort walk only if something found.
*/
if (fwd->wn) {
return FALSE;
}
else {
return TRUE;
}
}
struct WorkNode *__saveds __asm LIBFindWork(
register __a0 struct TagItem *tags)
/*
* In: tags A0 Pointer to tag list
*
* Does: Searches the queue for work satisfying the conditions set by
* the tag list. If a work node is found, its lock bit is set
* and its address is returned. Otherwise, the function returns
* NULL. Valid tags are:
*
* XQ_Name Fully-pathed filename of file (mandatory)
* XQ_Site Site that file was sent to (optional)
*
* If XQ_Site is not given, all sites in the queue are searched.
* If the file is being sent to more than one site, you'll never
* find out.
*
* If XQ_Site is given, it may be either an address or a session.
* If a session, all sites on the session are searched.
*/
{
struct FindWorkData fwd;
void *site;
char *name;
struct WorkNode *wn;
name = (char *)GetTagData(XQ_Name, NULL, tags);
site = (void *)GetTagData(XQ_Site, NULL, tags);
/*
* The name is stored fully pathed so it must be searched fully
* pathed.
*/
fwd.name = FullPath(name);
if (!fwd.name) {
SetErrorTags(tags);
return NULL;
}
fwd.wn = NULL;
fwd.esw.func = FindWorkCB;
ObtainSemaphore(&QueueLock);
ExtWalkSession(site, &fwd);
if (fwd.wn) {
wn = LockWork(fwd.wn);
}
else {
Error(XQERROR_NOEXIST);
wn = NULL;
}
ReleaseSemaphore(&QueueLock);
XfqDropObject(fwd.name);
SetErrorTags(tags);
return wn;
}
struct WorkNode *__saveds __asm LIBGetWork(register __a0 void *object,
register __a1 struct TagItem *tags)
/*
* In: object A0 Pointer to address or session
* tags A1 Taglist giving priority window
*
* Does: Finds the next item of work that can be sent, locks it and
* returns it. If object is an address, then only that queue
* is searched. If object is a session, then all addresses in
* the session are searched.
*
* Criteria for transmission include:
* - Work must not be locked.
* - Work must be UNSENT.
* - Work must be within priority bounds of taglist
* - Work must be higher priority than other eligible work
* - Work must not be SENDLATER
*/
{
struct WorkNode *wn;
ObtainSemaphore(&QueueLock);
#ifdef DEBUG
kprintf("In GetWork - ObjectType %ld\n", ObjectType(object));
#endif
wn = EligibleWorkTags(object,
XQ_IncLocked, FALSE,
TAG_MORE, tags);
if (wn) {
LockWork(wn);
}
else {
Error(XQERROR_NOEXIST);
}
ReleaseSemaphore(&QueueLock);
#ifdef DEBUG
kprintf("Left GetWork with %08lx.\n", wn);
#endif
SetErrorTags(tags);
return wn;
}
ULONG __saveds __asm LIBRemoveWork(register __a0 struct WorkNode *wn)
/*
* In: wn A0 Pointer to node to remove
*
* Does: Removes the node from its queue so that NO queue management
* function can see it. XfqAddWork() can be used to re-insert
* the node.
*
* If the file has been sent and deletion or truncation has been
* specified, then kiss the file good-bye.
*/
{
ULONG result;
ObtainSemaphore(&QueueLock);
if (wn->status == XQ_SENT) {
if (wn->userFlags & XQ_DELETE) {
DeleteFile(wn->node.ln_Name);
if (IoErr()) {
Error(XQERROR_DOS);
result = XQRM_ERROR;
}
else {
result = XQRM_DELETED;
}
}
else if (wn->userFlags & XQ_TRUNCATE) {
result = TruncateFile(wn->node.ln_Name);
}
else {
result = XQRM_SENT;
}
}
else {
result = XQRM_UNSENT;
}
REMOVE(wn);
wn->site->flags |= XQSITE_DIRTY;
wn->sysFlags &= ~XQ_INQUEUE;
ReleaseSemaphore(&QueueLock);
return result;
}
LONG __saveds __asm LIBMaxSitePri(register __a0 void *object)
/*
* In: object A0 Pointer to address or session
*
* Does: Determines the maximum priority of any work described by
* the supplied object. Object may be either a site or a session.
*/
{
struct WorkNode *wn;
LONG result;
ObtainSemaphore(&QueueLock);
wn = EligibleWorkTags(object,
XQ_IncLocked, FALSE,
TAG_DONE);
if (wn) {
result = wn->node.ln_Pri;
}
else {
result = XQ_NOPRI;
}
ReleaseSemaphore(&QueueLock);
return result;
}
void __saveds __asm LIBWalkQueueCallBack(
register __a0 struct NetAddress *addr,
register __a1 struct TagItem *tags,
register __a4 void *globals)
/*
* In: addr A0 Pointer to address to walk through
* tags A1 Pointer to tag list
* globals A4 Pointer to caller's global area
*
* Does: Calls a user-supplied hook function for each entry in the
* queue. The function must return TRUE to continue the walk,
* FALSE otherwise. Valid tags are:
*
* XQ_Reverse Set to TRUE to walk from low-to-high priority
* XQ_CallBack Data is pointer to hook structure.
*
* Note: Access to all queues in the system is disabled while this
* function executes. Be quick!
*/
{
struct SiteNode *sn;
struct WorkNode *wn, *twn;
struct Hook *hook;
BOOL reverse, incLocked;
hook = (struct Hook *)GetTagData(XQ_CallBack, NULL, tags);
if (!hook) {
Error(XQERROR_NOTAG);
SetErrorTags(tags);
return;
}
reverse = (BOOL)GetTagData(XQ_Reverse, FALSE, tags);
incLocked = (BOOL)GetTagData(XQ_IncLocked, FALSE, tags);
ObtainSemaphore(&QueueLock);
sn = FindSiteQueue(addr);
if (!sn) {
Error(XQERROR_NOEXIST);
SetErrorTags(tags);
return;
}
if (reverse) {
wn = LAST(&sn->workList);
while (twn = PREV(wn)) {
if (incLocked || !(wn->sysFlags & XQ_LOCKED)) {
if (!CallHookRes(hook, wn, tags, globals)) {
break;
}
}
wn = twn;
}
}
else {
wn = FIRST(&sn->workList);
while (twn = NEXT(wn)) {
if (incLocked || !(wn->sysFlags & XQ_LOCKED)) {
if (!CallHookRes(hook, wn, tags, globals)) {
break;
}
}
wn = twn;
}
}
SetErrorTags(tags);
ReleaseSemaphore(&QueueLock);
}
ULONG __saveds __asm LIBAddWork(register __a0 struct NetAddress *addr,
register __a1 struct WorkNode *wn)
/*
* In: addr A0 Address to add work to
* wn A1 Work node to add
*
* Does: Enqueues the work onto the given queue.
*
*/
{
struct SiteNode *sn;
struct FindWorkData fwd;
ObtainSemaphore(&QueueLock);
/*
* If IMMEDIATE and no session is up, then don't add work.
*/
if ((wn->userFlags & XQ_IMMEDIATE) && !XfqSessionUp(addr)) {
Error(XQERROR_NOSESSION);
ReleaseSemaphore(&QueueLock);
return FALSE;
}
/*
* If SENDLATER and no session is up, then clear the SENDLATER bit.
*/
if ((wn->userFlags & XQ_SENDLATER) && !XfqSessionUp(addr)) {
wn->userFlags &= ~XQ_SENDLATER;
}
/*
* Check to see if work is already in the queue. This is the
* only reliable way to perform this check.
*/
fwd.name = wn->node.ln_Name;
fwd.wn = NULL;
fwd.esw.func = FindWorkCB;
ExtWalkSession(addr, &fwd);
if (fwd.wn) {
Error(XQERROR_QUEUED);
ReleaseSemaphore(&QueueLock);
return FALSE;
}
/*
* Find place to add work.
*/
sn = FindSiteQueue(addr);
if (!sn) {
ReleaseSemaphore(&QueueLock);
return FALSE;
}
ENQUEUE(&sn->workList, wn);
/*
* Fix up links to address, siteNode and dirty queue.
*/
wn->site = sn;
if (wn->addr) {
XfqDropObject(wn->addr);
}
XfqCopyObject(sn->site);
wn->addr = sn->site;
wn->sysFlags |= XQ_INQUEUE | XQ_LOCKED;
sn->flags |= XQSITE_DIRTY;
ReleaseSemaphore(&QueueLock);
return TRUE;
}
BOOL PreExamWork(struct WorkNode *wn)
/*
* In: wn Work node to examine.
*
* Does: Locks the queue for data integrity.
*/
{
ObtainSemaphore(&QueueLock);
return TRUE;
}
BOOL PostExamWork(struct WorkNode *wn)
/*
* In: wn Work node to examine.
*
* Does: Releases lock after examining node.
*/
{
ReleaseSemaphore(&QueueLock);
return TRUE;
}
BOOL PreModifyWork(struct WorkNode *wn)
/*
* In: wn Work node to modify
*
* Does: This function is called before a work node is modified. It
* removes the node from the queue for mutex purposes.
*/
{
ObtainSemaphore(&QueueLock);
/*
* If the node is in the queue, then remove it.
*/
if (wn->sysFlags & XQ_INQUEUE) {
REMOVE(wn);
}
return TRUE;
}
BOOL PostModifyWork(struct WorkNode *wn)
/*
* In: wn Pointer to modified work
*
* Does: Inserts the work node into the proper spot on the queue.
*/
{
char *name;
/*
* Compute full path of name
*/
name = wn->node.ln_Name;
wn->node.ln_Name = FullPath(name);
XfqDropObject(name);
if (!wn->node.ln_Name) {
ReleaseSemaphore(&QueueLock);
return FALSE;
}
/*
* If the node was in the queue at the time the modification started,
* then re-insert into the queue.
*/
if (wn->sysFlags & XQ_INQUEUE) {
XfqAddWork(wn->addr, wn);
}
ReleaseSemaphore(&QueueLock);
return TRUE;
}
struct Session *__saveds __asm LIBGetSiteList(void)
/*
* Does: Creates a private session whose addresses are those that have
* work queued to them.
*/
{
struct Session *session;
struct SessNode *sn;
struct SiteNode *sq, *tsq;
session = XfqCreateObjectTags(XQO_SESSION,
XQ_SessFlags, XQSESS_PRIVATE,
TAG_DONE);
if (!session) {
return NULL;
}
ObtainSemaphore(&QueueLock);
if (!QueueScanned) {
ScanQueue();
if (!QueueScanned) {
ReleaseSemaphore(&QueueLock);
return NULL;
}
}
sq = FIRST(&QueueList);
while (tsq = NEXT(sq)) {
if (sq->flags & XQSITE_UNREAD) {
ReadQueue(sq);
if (sq->flags & XQSITE_UNREAD) {
XfqDropObject(session);
ReleaseSemaphore(&QueueLock);
return NULL;
}
}
if (NEXT(FIRST(&sq->workList))) {
sn = AllocObject(sizeof(struct SessNode), XQO_SESSNODE);
if (sn) {
sn->addr = sq->site;
XfqCopyObject(sq->site);
ADDHEAD(&session->sessList, sn);
}
else {
XfqDropObject(session);
ReleaseSemaphore(&QueueLock);
return NULL;
}
}
sq = tsq;
}
ReleaseSemaphore(&QueueLock);
return session;
}
ULONG __saveds __asm LIBAddWorkQuick(register __a0 char *site,
register __a1 char *name, register __a2 char *asname,
register __d0 LONG pri, register __d1 LONG flags)
/*
* In: site A0 Address STRING to add work to
* name A1 Name of file to add
* asname A2 Name to send file as
* pri D0 Priority to send file at
* flags D1 Flags controlling file handling
*
* Does: This is a quick-and-dirty addition function. It parses the
* address string, creates a work node and adds it to the queue.
* Function returns TRUE if it worked.
*/
{
struct NetAddress *addr;
struct WorkNode *wn;
ULONG *errp;
addr = XfqGetAddressTags(site, NULL,
XQ_Mandatory, XQADDR_2D,
XQ_Optional, XQADDR_ANYTHING,
TAG_DONE);
#ifdef DEBUG
kprintf("AddWorkQuick address = %08lx\n", addr);
#endif
if (!addr) {
return FALSE;
}
/* Create the work node */
wn = XfqCreateObjectTags(XQO_WORKNODE,
XQ_Name, name,
XQ_AsName, asname,
XQ_Pri, pri,
XQ_Flags, flags,
TAG_DONE);
if (!wn) {
XfqDropObject(addr);
return FALSE;
}
/* Add the work node */
XfqAddWork(addr, wn);
if (!(wn->sysFlags & XQ_INQUEUE)) {
XfqDropObject(addr);
return FALSE;
}
XfqUnlockWork(wn);
/* The address is no longer needed */
XfqDropObject(addr);
return TRUE;
}
void __saveds __asm LIBFlushQueue(register __a0 void *object)
/*
* In: object A0 Pointer to object to flush
*
* Does: Writes one or more site queues out to disk. If object is an
* address, then only that site queue will be flushed. If object
* is a session, then all addresses in the session will be flushed.
* If object is NULL, then all queues are flushed.
*/
{
struct ExtSessWalk esw;
ObtainSemaphore(&QueueLock);
esw.func = WriteQueue;
ExtWalkSession(object, &esw);
ReleaseSemaphore(&QueueLock);
}
void SessionDownQueue(struct SiteNode *sn)
/*
* In: sn Site node to scan for dead work
*
* Does: All sessions just went down, so remove all IMMEDIATE nodes and
* clear the SENDLATER bits.
*/
{
struct WorkNode *wn, *twn;
wn = FIRST(&sn->workList);
while (twn = NEXT(wn)) {
wn->userFlags &= ~XQ_SENDLATER;
if (wn->userFlags & XQ_IMMEDIATE) {
wn->status=XQ_SENT;
XfqRemoveWork(wn);
XfqDropObject(wn);
}
wn = twn;
}
}
ULONG __saveds __asm LIBHoldMailer(register __a0 struct NetAddress *addr)
/*
* In: addr A0 Address to lock mailer for.
*
* Does: Puts a mailer on hold by setting a flag that affects the result of
* XfqAnyWork(). This function is used to delay a mailer session
* while work is being created.
*/
{
struct SiteNode *sn;
ObtainSemaphore(&QueueLock);
sn = FindSiteQueue(addr);
if (sn) {
sn->numLocks++;
}
ReleaseSemaphore(&QueueLock);
return (sn) ? (ULONG)TRUE : (ULONG)FALSE;
}
ULONG __saveds __asm LIBReleaseMailer(register __a0 struct NetAddress *addr)
/*
* In: addr A0 Address to release mailer for.
*
* Does: Releases a hold requested by XfqHoldMailer().
*/
{
struct SiteNode *sn;
ULONG result;
ObtainSemaphore(&QueueLock);
sn = FindSiteQueue(addr);
if (sn && sn->numLocks > 0) {
sn->numLocks--;
result = TRUE;
}
else {
result = FALSE;
}
ReleaseSemaphore(&QueueLock);
return result;
}